/*
 * Copyright (C) 2016, apexes.net. All rights reserved.
 * 
 *        http://www.apexes.net
 * 
 */
package net.apexes.wsonrpc.server.support;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import net.apexes.wsonrpc.core.WsonrpcConfig;
import net.apexes.wsonrpc.server.WsonrpcCloseListener;
import net.apexes.wsonrpc.server.WsonrpcMessageListener;
import net.apexes.wsonrpc.server.WsonrpcOpenListener;
import net.apexes.wsonrpc.server.WsonrpcPingListener;
import net.apexes.wsonrpc.server.WsonrpcRequestInterceptor;
import net.apexes.wsonrpc.server.WsonrpcServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @author <a href="mailto:hedyn@foxmail.com">HeDYn</a>
 *
 */
public class NettyWsonrpcServer {
    
    private static final Logger LOG = LoggerFactory.getLogger(NettyWsonrpcServer.class);

    private final int port;
    private final String urlPath;
    private final int maxFrameSize;
    private final NettyWsonrpcServerHandler wsonrpcHandler;
    private final AtomicBoolean isclose = new AtomicBoolean(false);
    private ChannelFuture future;
    private WsonrpcPingListener pingListener;

    public NettyWsonrpcServer(int port, String urlPath, WsonrpcConfig config) {
        this(port, urlPath, 65536, config);
    }

    public NettyWsonrpcServer(int port, String urlPath, int maxFrameSize, WsonrpcConfig config) {
        this(port, urlPath, maxFrameSize, new NettyWsonrpcServerHandler(config));
    }

    public NettyWsonrpcServer(int port, String urlPath, NettyWsonrpcServerHandler handler) {
        this(port, urlPath, 65536, handler);
    }

    public NettyWsonrpcServer(int port, String urlPath, int maxFrameSize, NettyWsonrpcServerHandler handler) {
        this.port = port;
        this.urlPath = urlPath;
        this.maxFrameSize = maxFrameSize;
        this.wsonrpcHandler = handler;
    }
    
    public WsonrpcServer getWsonrpcServer() {
        return wsonrpcHandler.getWsonrpcServer();
    }

    public WsonrpcPingListener getPingListener() {
        return pingListener;
    }

    public void setPingListener(WsonrpcPingListener pingListener) {
        this.pingListener = pingListener;
    }

    public void setWsonrpcOpenListener(WsonrpcOpenListener listener) {
        wsonrpcHandler.setWsonrpcOpenListener(listener);
    }

    public void setWsonrpcCloseListener(WsonrpcCloseListener listener) {
        wsonrpcHandler.setWsonrpcCloseListener(listener);
    }

    public void setWsonrpcMessageListener(WsonrpcMessageListener listener) {
        wsonrpcHandler.setWsonrpcMessageListener(listener);
    }

    public void setWsonrpcRequestInterceptor(WsonrpcRequestInterceptor interceptor) {
        wsonrpcHandler.setWsonrpcRequestInterceptor(interceptor);
    }

    public void start() {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new WsonrpcChannelInitializer())
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true);

        LOG.debug("Starting. port={}", port);

        isclose.set(false);
        try {
            future = bootstrap.bind(port).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            if (!isclose.get()) {
                LOG.error("", e);
            }
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    
    public void stop() {
        isclose.set(true);
        future.cancel(true);
    }

    /**
     * 
     * @author <a href="mailto:hedyn@foxmail.com">HeDYn</a>
     *
     */
    private class WsonrpcChannelInitializer extends ChannelInitializer<SocketChannel> {

        @Override
        public void initChannel(SocketChannel ch) {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new HttpServerCodec());
            pipeline.addLast(new HttpObjectAggregator(maxFrameSize));
            pipeline.addLast(new ChunkedWriteHandler());
            pipeline.addLast(new WsonrpcServerProtocolHandler(urlPath, null , false, maxFrameSize));
            pipeline.addLast(wsonrpcHandler);
        }
    }

    private class WsonrpcServerProtocolHandler extends WebSocketServerProtocolHandler {

        WsonrpcServerProtocolHandler(String websocketPath, String subprotocols, boolean allowExtensions, int maxFrameSize) {
            super(websocketPath, subprotocols, allowExtensions, maxFrameSize);
        }

        @Override
        protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> out) throws Exception {
            WsonrpcPingListener listener = pingListener;
            if (listener != null && frame instanceof PingWebSocketFrame) {
                frame.content().retain();
                ByteBuf buf = frame.content();
                byte[] bytes = new byte[buf.readableBytes()];
                buf.readBytes(bytes);
                ctx.channel().writeAndFlush(new PongWebSocketFrame(buf));

                listener.onPing(NettyWsonrpcServerHandler.sessionId(ctx.channel()), bytes);

                readIfNeeded(ctx);
                return;
            }
            super.decode(ctx, frame, out);
        }

        private void readIfNeeded(ChannelHandlerContext ctx) {
            if (!ctx.channel().config().isAutoRead()) {
                ctx.read();
            }
        }
    }

}
